Skip to content
标签
发布订阅
字数
2485 字
阅读时间
13 分钟

一、方式一

单发布者关联单订阅者,发布者创建时需关联订阅者,启动30没有消息则关闭发布消息线程,发送消息时再开启,发布订阅对象创建时需添加订阅者的对象。

发布者,声明订阅者和发布者接口,由处理程序实现发布者的接口

订阅者接口

java

/**
 * 订阅者模式 订阅者接口  <i>服务器规范接口</i> 
 * @author chenli
 * @date 2016-2-3
 * @version 1.0
 * @see 组织订阅接口{@link AbstractOrgSubscriber}
 * @see 用户订阅接口{@link AbstractUserSubscriber}
 * @see 用户账户安全配置订阅接口{@link AbstractUserSafetySubscriber }
 * @see 资源订阅接口 {@link AbstractAppSubscriber}
 * @see 资源的用户角色订阅接口 {@link AbstractAppUserRoleSubscriber}
 * @see 发布器{@link Publisher}
 */

@SuppressWarnings("rawtypes")
public interface ISubscriber<T extends IdEntity> extends IName,IPriority{
	
	/**
	 * 新增事件通知.
	 * <br>通知者已经使用生产者和消费者模式
	 * @param obj 新增的对象
	 * @param extend 对象的自定义属性 
	 * @param number 操作流水号
	 */
	public void insert(T obj,Map<String,String> extend, OptSerialNum number);
	
	/**
	 * 变更事件通知
	 * <br>通知者已经使用生产者和消费者模式
	 * @param obj 最新的对象
	 * @param extend 自定义属性。最新新值
	 * @param change 发生变更的数据。  包括用户扩展数据等用户基础信息
	 * @param number 操作流水号
	 */
	public void update(T obj, Map<String,String> extend, Map<String,Object> change, OptSerialNum number);
	
	/**
	 * 删除事件通知
	 * <br>通知者已经使用生产者和消费者模式
	 * @param obj 删除的对象
	 * @param number 操作流水号
	 */
	public void delete(T obj, OptSerialNum number);
	
	/**
	 * 是否使用异步通知
	 * @return
	 */
	public boolean isAsync();
}

发布者接口

java

/**
 * 订阅者模式 消息发布者  <i>服务器规范接口</i> 
 * @author chenli
 * @date 2016-2-3
 * @version 1.0
 * @see 多例模式使用spring管理实例对象{@link Publisher}
 */

@SuppressWarnings("rawtypes")
public interface IPublisher<T extends IdEntity> extends ISubscriber<T>{

	/**
	 * 增加订阅者
	 * @param subscriber
	 */
	public void addSubscriber(ISubscriber<T> subscriber);
	
	/**
	 * 删除订阅者
	 * @param subscriber
	 */
	public boolean delSubscriber(ISubscriber<T> subscriber);
	
	/**
	 * 返回订阅者的数量
	 * @return
	 */
	public int size();
}

发布者

发布者实现接口,内部包含订阅者集合,可以对订阅者进行添加、修改何删除。可定义多个发布者对应不同的订阅者。订阅者也可进行抽象成某一类型

java

/**
 * 建议使用spring多例模式管理,针对每一个类型(泛型)生成单例。
 * 使用生产者消费者模式。
 * @author chenli
 * @date 2016-2-3
 * @version 1.0
 */
@SuppressWarnings("rawtypes")
public class Publisher<T extends IdEntity> implements IPublisher<T>, Runnable{
	private final static Logger logger = LoggerFactory.getLogger(Publisher.class);
	private String name;
	private String desc;
	private boolean ischangeSubs = true;
	private ISubscriber<T>[] subs;
	
	private Vector<ISubscriber<T>> subscribers = new Vector<ISubscriber<T>>();
	private ReentrantLock Sublock = new ReentrantLock();
	private ReentrantLock msglock = new ReentrantLock();
	private ReentrantLock msglock2 = new ReentrantLock();
	//创建本线程
	private Thread publisherThread;
	//创建本线程
	private Thread publisherThread2;
	//通知队列
	private BlockingQueue<SubscriberMsg<T>> msgqueue = new LinkedBlockingQueue<SubscriberMsg<T>>(1024*5);
	
	@Override
	public void addSubscriber(ISubscriber<T> subscriber) {
		try {
			Sublock.lock();
			if(subscribers.contains(subscriber) == false){
				subscribers.add(subscriber);
				ischangeSubs = true;
			}
		} catch (Exception e) {
			// IGNORE
		}finally{
			Sublock.unlock();
		}
	}
	
	@Override
	public boolean delSubscriber(ISubscriber<T> subscriber) {
		try {
			Sublock.lock();
			if(subscribers.contains(subscriber)){
				boolean result = subscribers.remove(subscriber);
				if(result)ischangeSubs = true;
				return result;
			}
		} catch (Exception e) {
			// IGNORE
		}finally{
			Sublock.unlock();
		}
		return false;
	}
	
	@Override
	public int size() {
		return subscribers.size();
	}
	
	/**
	 * 获取对象的时候 返回copy对象,防止出现{@link ConcurrentModificationException}异常
	 * @return
	 * @author chenli
	 * @data 2016-2-3
	 */
	@SuppressWarnings("unchecked")
	protected ISubscriber<T>[] getSubscribers(){
		if(!ischangeSubs){
			return subs;
		}
		try {
			Sublock.lock();
			subs = new ISubscriber[subscribers.size()];
			Collections.sort(subscribers,new PriorityComparator());
			subs = subscribers.toArray(subs);
			ischangeSubs = false;
		} catch (Exception e) {
			// IGNORE
		}finally{
			Sublock.unlock();
		}
		return subs;
	}
	
	@Override
	public void run() {
		while(true){
			try {
				//30分钟没有则退出线程
				SubscriberMsg<T> msg = msgqueue.poll(30, TimeUnit.MINUTES);
				if(msg == null){
					break;
				}
				
				switch (msg.getType()) {
				case SubscriberMsg.TYPE_INSERT:
					noticeInsert(msg.getObj(), msg.getExtend(), msg.getNumber(), true);
					break;
				case SubscriberMsg.TYPE_UPDATE:
					noticeUpdate(msg.getObj(), msg.getExtend(), msg.getChange(), msg.getNumber(), true);
					break;
				case SubscriberMsg.TYPE_DELETE:
					noticeDelete(msg.getObj(), msg.getNumber(), true);
					break;
				default:
					logger.warn("发布者【{}】发布错误的消息类型", this.getName());
					break;
				}
				
			} catch (Exception e) {
				logger.error("发布者{}订阅者{},事件处理异常",getName(),SerializerUtil.serialize(getSubscribers()),e);
			}
		}
	}
	
	/**
	 * 使用生产者消费者方式通知订阅者。
	 * <br>可能对通知对象存在消息滞后。 (如后期增加的订阅者,可能也会通知之前的消息。)
	 */
	public void insert(T obj, java.util.Map<String,String> extend, OptSerialNum number) {
		runThread();
		SubscriberMsg<T> msg = new SubscriberMsg<T>();
		msg.setType(SubscriberMsg.TYPE_INSERT);
		msg.setObj(obj);
		msg.setNumber(number);
		msg.setExtend(extend);
		try {
			//同步通知
			noticeInsert(msg.getObj(), msg.getExtend(), msg.getNumber(), false);
			//异步通知
			msgqueue.put(msg);
		} catch (Exception e) {
			String str = "发布者(%s)通知insert事件异常,参数:obj=%s,number=%s";
			try {
				str = String.format(str, this.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
			} catch (Exception e2) {
				str = String.format(str, this.getName(),obj,number);
			}
			logger.error(str,e);
		}
	}
	
	public void delete(T obj, OptSerialNum number) {
		runThread();
		SubscriberMsg<T> msg = new SubscriberMsg<T>();
		msg.setType(SubscriberMsg.TYPE_DELETE);
		msg.setObj(obj);
		msg.setNumber(number);
		try {
			//同步通知
			noticeDelete(msg.getObj(), msg.getNumber(), false);
			//异步通知
			msgqueue.put(msg);
		} catch (Exception e) {
			String str = "发布者(%s)通知delete事件异常,参数:obj=%s,number=%s";
			try {
				str = String.format(str, this.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
			} catch (Exception e2) {
				str = String.format(str, this.getName(),obj,number);
			}
			logger.error(str,e);
		}
	};
	
	
	public void update(T obj, java.util.Map<String,String> extend, java.util.Map<String,Object> change, OptSerialNum number) {
		runThread();
		SubscriberMsg<T> msg = new SubscriberMsg<T>();
		msg.setType(SubscriberMsg.TYPE_UPDATE);
		msg.setObj(obj);
		msg.setNumber(number);
		msg.setExtend(extend);
		msg.setChange(change);
		
		try {
			//同步通知
			noticeUpdate(msg.getObj(), msg.getExtend(), msg.getChange(), msg.getNumber(), false);
			//异步通知
			msgqueue.put(msg);
		} catch (Exception e) {
			String str = "发布者(%s)通知update事件异常,参数:obj=%s,number=%s";
			try {
				str = String.format(str, this.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
			} catch (Exception e2) {
				str = String.format(str, this.getName(),obj,number);
			}
			logger.error(str,e);
		}
	};
	
	/**
	 * 检测线程是否启动,如果未启动则启动线程
	 * @author chenli
	 * @data 2016-10-9
	 */
	private void runThread(){
		if(publisherThread == null || publisherThread.isAlive() == false){
			try {
				msglock.lock();
				if(publisherThread == null || publisherThread.isAlive() == false){
					publisherThread = new Thread(this,this.getName());
					publisherThread.start();
				}
			} catch (Exception e) {
				logger.error("发布者1【{}】线程启动失败", this.getName(),e);
			}finally{
				msglock.unlock();
			}
		}
		if(publisherThread2 == null || publisherThread2.isAlive() == false){
			try {
				msglock2.lock();
				if(publisherThread2 == null || publisherThread2.isAlive() == false){
					publisherThread2 = new Thread(this,this.getName());
					publisherThread2.start();
				}
			} catch (Exception e) {
				logger.error("发布者2【{}】线程启动失败", this.getName(),e);
			}finally{
				msglock2.unlock();
			}
		}
	}
	
	/**
	 * 通知订阅者。新增事件
	 * @param obj 对象
	 * @param extend 对象扩展
	 * @param number 操作号
	 * @param isasync 是否异步
	 * @author chenli
	 * @data 2016-10-10
	 */
	protected void noticeInsert(T obj, Map<String,String> extend, OptSerialNum number, boolean isasync) {
		ISubscriber<T>[] subs = getSubscribers();
		for (ISubscriber<T> subscriber : subs) {
			try {
				if(subscriber.isAsync() == isasync){
					//long t1 = System.currentTimeMillis();
					subscriber.insert(obj, extend, number);
					//long t2 = System.currentTimeMillis() - t1;
					/*if(t2 > 500){
						logger.error(subscriber.getName() + " run too long:" + t2);
					}*/
				}
			} catch (Exception e) {
				String str = "订阅者subscriber=%s处理insert事件异常,参数:obj=%s,number=%s";
				try {
					str = String.format(str, subscriber.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
				} catch (Exception e2) {
					str = String.format(str, subscriber.getName(),obj,number);
				}
				logger.error(str,e);
			}
		}
	}
	
	/**
	 * 通知订阅者删除事件
	 * @param obj 对象
	 * @param extend 对象扩展
	 * @param number 操作号
	 * @param isasync 是否异步
	 * @author chenli
	 * @data 2016-10-10
	 */
	protected void noticeDelete(T obj, OptSerialNum number, boolean isasync) {
		ISubscriber<T>[] subs = getSubscribers();
		for (ISubscriber<T> subscriber : subs) {
			try {
				if(subscriber.isAsync() == isasync){
					//long t1 = System.currentTimeMillis();
					subscriber.delete(obj,number);
					//long t2 = System.currentTimeMillis() - t1;
					/*if(t2 > 500){
						logger.error(subscriber.getName() + " run too long:" + t2);
					}*/
				}
			} catch (Exception e) {
				String str = "订阅者subscriber=%s处理delete事件异常,参数:obj=%s,number=%s";
				try {
					str = String.format(str, subscriber.getName(),SerializerUtil.serialize(obj),SerializerUtil.serialize(number));
				} catch (Exception e2) {
					str = String.format(str, subscriber.getName(),obj,number);
				}
				logger.error(str,e);
			}
		}
	}
	
	/**
	 * 通知订阅者更新事件
	 * @param obj 对象
	 * @param extend 对象扩展
	 * @param number 操作号
	 * @param isasync 是否异步
	 * @author chenli
	 * @data 2016-10-10
	 */
	protected void noticeUpdate(T obj, Map<String,String> extend, Map<String,Object> change, OptSerialNum number, boolean isasync) {
		ISubscriber<T>[] subs = getSubscribers();
		for (ISubscriber<T> subscriber : subs) {
			try {
				if(subscriber.isAsync() == isasync){
					//long t1 = System.currentTimeMillis();
					subscriber.update(obj, extend, change, number);
					//long t2 = System.currentTimeMillis() - t1;
					/*if(t2 > 500){
						logger.error(subscriber.getName() + " run too long:" + t2);
					}*/
				}
			} catch (Exception e) {
				String str = "订阅者subscriber=%s处理update事件异常,参数:id=%s,change=%s,number=%s";
				Serializable id = null;
				if(obj != null){
					id = obj.getId();
				}
				try {
					str = String.format(str, subscriber.getName(),id,SerializerUtil.serialize(change),SerializerUtil.serialize(number));
				} catch (Exception e2) {
					str = String.format(str, subscriber.getName(),id,change,number);
				}
				logger.error(str,e);
			}
		}
	}
	
	@Override
	public boolean isAsync() {
		return true;
	}
	
	@Override
	public int priority() {
		return PRIORITY_DEAULT;
	}
	
	@Override
	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}
	
	public String getDesc() {
		return desc;
	}
	public void setDesc(String desc) {
		this.desc = desc;
	}
}

订阅者

订阅者抽象为一类,可进行多扩展

java
/**
 * 用户角色订阅
 * @author wuzm
 *
 */
public abstract class AbstractUserRoleSubscriber implements ISubscriber<UserRole>,IPriority{
	/**
	 * 默认优先级5.
	 * 如果需要设置优先级,子类请重写该方法。
	 */
	public int priority() {
		return PRIORITY_DEAULT;
	}
	
	/**
	 * 默认使用异步通知
	 */
	@Override
	public boolean isAsync() {
		return true;
	}
}

二、方式二

定义事件和事件监听,通过事件发布控制程序发布,通过线程池执行特定的事件监听。全局发布订阅使用同一个控制程序。

事件接口

java
/**
 * 事件模型接口
 * @author Brack.zhu
 * @date 2020年12月2日
 */
public interface Event {
	
	/**
	 * 事件发起时间
	 */
	long time=System.currentTimeMillis();

}

事件监听接口

java
/**
 * 事件监听接口
 * @author Brack.zhu
 * @date 2020年12月2日
 */
public interface EventListener<E extends Event> {
	
	/**
	 * 事件触发
	 * @param event
	 */
	void onEvent(E event);

	
}

监听执行方法抽象类

java
/**
 * 监听抽象类,默认注册到事件工具类中
 * @author Brack.zhu
 * @date 2020年12月3日
 * @param <E>
 */
public abstract class AbstEventListener<E extends Event> implements EventListener<E> {

	public AbstEventListener() {
		EventHelper.registerEventListener(this);
	}
}

事件工具类

java
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.edcall.module.device.util.ClassUtil;

/**
 * 事件工具类
 * 
 * @author Brack.zhu
 * @date 2020年12月2日
 */
public class EventHelper {

	private static HashMap<String, List<EventListener<? extends Event>>> eventListeners = new HashMap<>();

	private static ExecutorService executors = Executors.newCachedThreadPool();

	private static Logger log = LoggerFactory.getLogger(EventHelper.class);

	/**
	 * 发布事件
	 * 
	 * @param <E>
	 * @param event
	 */
	public static void publish(Event event) {
		if (null == event) {
			return;
		}
		List<EventListener<?>> listeners = eventListeners.get(event.getClass().getSimpleName());
		if (null == listeners || listeners.size() == 0) {
			return;
		}
		executors.execute(new Runnable() {

			@SuppressWarnings({ "rawtypes", "unchecked" })
			@Override
			public void run() {
				for (EventListener eventListener : listeners) {
					try {
						eventListener.onEvent(event);
					} catch (Exception e) {
						log.error("", e);
					}
				}
			}
		});
	}

	/**
	 * 注册事件监听
	 * 
	 * @param <E>
	 * @param event
	 */
	public static <E extends Event> void registerEventListener(EventListener<E> event) {
		Class<?> genericsClazz = ClassUtil.getClassGenericsType(event, 0);
		String simpleName = genericsClazz.getSimpleName();
		synchronized (simpleName) {
			List<EventListener<? extends Event>> events = eventListeners.get(simpleName);
			if (null == events) {
				events = new LinkedList<>();
				eventListeners.put(simpleName, events);
			}
			events.add(event);
		}
	}

}

使用

java
实现Event接口标识事件类型
新建一个类继承监听实现抽象类AbstEventListener,反省未对应的Event事项类
    使用
    在方法中新建特定事件监听类,(构造方法中会将事件注册至EventHelper类中),使用EventHelper中的publish方法会通过线程池调用监听器中的实现逻辑。